AlphaGo-Zero五子棋实现笔记
代码在最下面,假如有帮助请star ☺☺☺
五子棋与围棋算法实现基本一致 但是,两个游戏不管是从计算落子的复杂度,蒙特卡洛模拟时性能要求,棋局走势的多变性和模型的收敛难度上完全不是一个级别的,在此向google致敬
对弈1300局后效果
对弈2400局后效果
阅读和实现论文笔记
- AlphaGo Zero(下称Zero)特点:
- 不同于AlphaGo Lee,Zero训练方式是self-play,并且没有使用任何已有数据用以监督学习
- 特征抽取也相对简单
- 仅使用唯一神经网络(下称net),包括前端特征网络以及后端的策略和价值网络
- 蒙特卡洛树搜索(下称mcts)逻辑相对简单
Zero中强化学习
Zero的net输入为历史盘面和当前盘面特征,二进制格式,即0和1,输出策略p和价值v,其中p为在棋盘上每个点落子的概率,v为评估当前盘面下当前玩家胜利的概率。
训练时,模型使用self-play方式,每一次落子都将使用mcts与net结合决定,mcts+net这对加强版策略玩家将会根据self-play的结果加速收敛。
mcts+net具体流程:1、选择子节点,2、展开子节点,选取局部最优解,3、更新父节点,4、循环1,2,3后落子。
每一个节点会保存节点访问次数 - N,net给出概率 - P,平均选择价值 - Q
模拟开始,从根节点通过公式PUCT选取子节点最大值递归遍历到叶节点, 然后按原路径返回更新每个节点的N和Q,直到分出胜负或者平局。
模拟结束,选择N最大的子节点作为下一个落子
在每一步落子时要记录当前盘面的状态,mcts+net对这一步做出的策略pi以及当前玩家,用以训练net
细节
- 温度:在self-play的前30步中使温度等于1,其他时候包括评估模型变现时使用一个极小值
- 根节点使用Dirichlet初始化P,使得可以尝试更多的落子选择
- 每落子一次,将以选择子节点更新为新的根节点
- 输入net的特征分别为8张仅有黑子和8张仅有白子已经当前玩家落子的棋盘
- 损失函数使用mean-squared和cross-entropy(公式如下图)
- 训练时对输入特征使用旋转和反转方式上采样
- 在mcts中,获取的每一步概率需要经过softmax处理
- 在mcts中,每次更新父节点时注意value的正负值
- 五子棋和围棋不同的是判断棋局正负时只需要关注最后落子方
- 神经网络在Zero中并不起到决定性作用,就算换成概率平均分布经过多次模拟之后也可以实现对弈
代码解释
五子棋盘构建
class Board(object):
def __init__(self,
size=SIZE,
hist_num=HISTORY,
c_action=-1,
player=BLACK):
self.size = size
self.c_action = c_action
self.hist_num = hist_num
self.valid_moves = list(range(size**2))
self.invalid_moves = []
self.board = np.zeros([size, size])
self.c_player = player
self.players = {"black": BLACK, "white": WHITE}
# BLACK -> 0 | WHITE -> 1
self.history = [np.zeros((hist_num, size, size)),
np.zeros((hist_num, size, size))]
判断棋局有没有以一方胜利结束 原理:以最后落子为中心判断横竖斜方向有没有同样颜色的五颗棋子
def is_game_over(self, player=None):
x, y = self.c_action // self.size, self.c_action % self.size
if player is None:
player = self.c_player
for i in range(x - 4, x + 5):
if self._get_piece(i, y) == self._get_piece(i + 1, y) == self._get_piece(i + 2, y) == self._get_piece(i + 3, y) == self._get_piece(i + 4, y) == player:
return True
for j in range(y - 4, y + 5):
if self._get_piece(x, j) == self._get_piece(x, j + 1) == self._get_piece(x, j + 2) == self._get_piece(x, j + 3) == self._get_piece(x, j + 4) == player:
return True
j = y - 4
for i in range(x - 4, x + 5):
if self._get_piece(i, j) == self._get_piece(i + 1, j + 1) == self._get_piece(i + 2, j + 2) == self._get_piece(i + 3, j + 3) == self._get_piece(i + 4, j + 4) == player:
return True
j += 1
i = x + 4
for j in range(y - 4, y + 5):
if self._get_piece(i, j) == self._get_piece(i - 1, j + 1) == self._get_piece(i - 2, j + 2) == self._get_piece(i - 3, j + 3) == self._get_piece(i - 4, j + 4) == player:
return True
i -= 1
return False
获取当前棋局特征 包括历史盘面,当前棋手和最后落子
def gen_state(self):
to_action = np.zeros((1, self.size, self.size))
to_action[0][self.c_action // self.size,
self.c_action % self.size] = 1.
to_play = np.full((1, self.size, self.size), self.c_player - BLACK)
state = np.concatenate(self.history + [to_play, to_action], axis=0)
return state
节点
mcts中树节点,解释参考注释和代码
class TreeNode(object):
def __init__(self,
action=None,
props=None,
parent=None):
self.parent = parent
self.action = action
self.children = []
self.N = 0 # visit count
self.Q = .0 # mean action value
self.W = .0 # total action value
self.P = props # prior probability
def is_leaf(self):
return len(self.children) == 0
def select_child(self):
index = np.argmax(np.asarray([c.uct() for c in self.children]))
return self.children[index]
def uct(self):
return self.Q + self.P * CPUCT * (np.sqrt(self.parent.N) / (1 + self.N))
def expand_node(self, props):
self.children = [TreeNode(action=action, props=p, parent=self)
for action, p in enumerate(props) if p > 0.]
def backup(self, v):
self.N += 1
self.W += v
self.Q = self.W / self.N
mcts
搜索 1、从根节点开始往下搜索直到叶节点 2、将当前棋面使用神经网络给出落子概率和价值评估 3、然后从叶节点返回到根节点一路更新
def search(self, borad, node, temperature=.001):
self.borad = borad
self.root = node
for _ in range(self.ms_num):
node = self.root
borad = self.borad.clone()
while not node.is_leaf():
node = node.select_child()
borad.move(node.action)
borad.trigger()
# be carefull - opponent state
value, props = self.net(
to_tensor(borad.gen_state(), unsqueeze=True))
value = to_numpy(value, USECUDA)[0]
props = np.exp(to_numpy(props, USECUDA))
# add dirichlet noise for root node
if node.parent is None:
props = self.dirichlet_noise(props)
# normalize
props[borad.invalid_moves] = 0.
total_p = np.sum(props)
if total_p > 0:
props /= total_p
# winner, draw or continue
if borad.is_draw():
value = 0.
else:
done = borad.is_game_over(player=borad.last_player)
if done:
value = -1.
else:
node.expand_node(props)
while node is not None:
value = -value
node.backup(value)
node = node.parent
神经网络
特征网络
class Feature(nn.Module):
def __init__(self,
ind=IND,
outd=RES_BLOCK_FILLTERS):
super().__init__()
self.layer = nn.Sequential(
nn.Conv2d(ind, outd,
stride=1,
kernel_size=3,
padding=1,
bias=False),
nn.BatchNorm2d(outd),
nn.ReLU(),
)
self.encodes = nn.ModuleList([ResBlockNet() for _ in range(BLOCKS)])
def forward(self, x):
x = self.layer(x)
for enc in self.encodes:
x = enc(x)
return x
策略网络
class Policy(nn.Module):
def __init__(self,
ind=RES_BLOCK_FILLTERS,
outd=OUTD,
kernels=2):
super().__init__()
self.out = outd * kernels
self.conv = nn.Sequential(
nn.Conv2d(ind, kernels, kernel_size=1),
nn.BatchNorm2d(kernels),
nn.ReLU(),
)
self.linear = nn.Linear(kernels * outd, outd)
self.linear.weight.data.uniform_(-.1, .1)
def forward(self, x):
x = self.conv(x)
x = x.view(-1, self.out)
x = self.linear(x)
return F.log_softmax(x, dim=-1)
价值网络
class Value(nn.Module):
def __init__(self,
ind=RES_BLOCK_FILLTERS,
outd=OUTD,
hsz=256,
kernels=1):
super().__init__()
self.outd = outd
self.conv = nn.Sequential(
nn.Conv2d(ind, kernels, kernel_size=1),
nn.BatchNorm2d(kernels),
nn.ReLU(),
)
self.linear = nn.Sequential(
nn.Linear(outd, hsz),
nn.ReLU(),
nn.Linear(hsz, 1),
nn.Tanh(),
)
self._reset_parameters()
def forward(self, x):
x = self.conv(x)
x = x.view(-1, self.outd)
return self.linear(x)
def _reset_parameters(self):
for layer in self.modules():
if type(layer) == nn.Linear:
layer.weight.data.uniform_(-.1, .1)
整体网络
class Net(nn.Module):
def __init__(self):
super().__init__()
self.feat = Feature()
self.value = Value()
self.policy = Policy()
def forward(self, x):
feats = self.feat(x)
winners = self.value(feats)
props = self.policy(feats)
return winners, props
def save_model(self, path="model.pt"):
torch.save(self.state_dict(), path)
def load_model(self, path="model.pt", cuda=True):
self.load_state_dict(torch.load(path))
if cuda:
self.cuda()
else:
self.cpu()
采样
def sample(self, datas):
for state, pi, reward in datas:
c_state = state.copy()
c_pi = pi.copy()
for i in range(4):
c_state = np.array([np.rot90(s, i) for s in c_state])
c_pi = np.rot90(c_pi.reshape(SIZE, SIZE), i)
self.sample_data.append([c_state, c_pi.flatten(), reward])
c_state = np.array([np.fliplr(s) for s in c_state])
c_pi = np.fliplr(c_pi)
self.sample_data.append([c_state, c_pi.flatten(), reward])
return len(datas)